package v1

import (
	"fmt"
	"time"

	"github.com/gogo/protobuf/proto"

	"github.com/tendermint/tendermint/behaviour"
	bc "github.com/tendermint/tendermint/blockchain"
	"github.com/tendermint/tendermint/libs/log"
	"github.com/tendermint/tendermint/p2p"
	bcproto "github.com/tendermint/tendermint/proto/tendermint/blockchain"
	sm "github.com/tendermint/tendermint/state"
	"github.com/tendermint/tendermint/store"
	"github.com/tendermint/tendermint/types"
)

const (
	// BlockchainChannel is a channel for blocks and status updates (`BlockStore` height)
	BlockchainChannel = byte(0x40)
	trySyncIntervalMS = 10
	trySendIntervalMS = 10

	// ask for best height every 10s
	statusUpdateIntervalSeconds = 10
)

var (
	// Maximum number of requests that can be pending per peer, i.e. for which requests have been sent but blocks
	// have not been received.
	maxRequestsPerPeer = 20
	// Maximum number of block requests for the reactor, pending or for which blocks have been received.
	maxNumRequests = 64
)

type consensusReactor interface {
	// for when we switch from blockchain reactor and fast sync to
	// the consensus machine
	SwitchToConsensus(state sm.State, skipWAL bool)
}

// BlockchainReactor handles long-term catchup syncing.
type BlockchainReactor struct {
	p2p.BaseReactor

	initialState sm.State // immutable
	state        sm.State

	blockExec *sm.BlockExecutor
	store     *store.BlockStore

	fastSync    bool
	stateSynced bool

	fsm          *BcReactorFSM
	blocksSynced uint64

	// Receive goroutine forwards messages to this channel to be processed in the context of the poolRoutine.
	messagesForFSMCh chan bcReactorMessage

	// Switch goroutine may send RemovePeer to the blockchain reactor. This is an error message that is relayed
	// to this channel to be processed in the context of the poolRoutine.
	errorsForFSMCh chan bcReactorMessage

	// This channel is used by the FSM and indirectly the block pool to report errors to the blockchain reactor and
	// the switch.
	eventsFromFSMCh chan bcFsmMessage

	swReporter *behaviour.SwitchReporter
}

// NewBlockchainReactor returns new reactor instance.
func NewBlockchainReactor(state sm.State, blockExec *sm.BlockExecutor, store *store.BlockStore,
	fastSync bool) *BlockchainReactor {

	if state.LastBlockHeight != store.Height() {
		panic(fmt.Sprintf("state (%v) and store (%v) height mismatch", state.LastBlockHeight,
			store.Height()))
	}

	const capacity = 1000
	eventsFromFSMCh := make(chan bcFsmMessage, capacity)
	messagesForFSMCh := make(chan bcReactorMessage, capacity)
	errorsForFSMCh := make(chan bcReactorMessage, capacity)

	startHeight := store.Height() + 1
	if startHeight == 1 {
		startHeight = state.InitialHeight
	}
	bcR := &BlockchainReactor{
		initialState:     state,
		state:            state,
		blockExec:        blockExec,
		fastSync:         fastSync,
		store:            store,
		messagesForFSMCh: messagesForFSMCh,
		eventsFromFSMCh:  eventsFromFSMCh,
		errorsForFSMCh:   errorsForFSMCh,
	}
	fsm := NewFSM(startHeight, bcR)
	bcR.fsm = fsm
	bcR.BaseReactor = *p2p.NewBaseReactor("BlockchainReactor", bcR)
	// bcR.swReporter = behaviour.NewSwitchReporter(bcR.BaseReactor.Switch)

	return bcR
}

// bcReactorMessage is used by the reactor to send messages to the FSM.
type bcReactorMessage struct {
	event bReactorEvent
	data  bReactorEventData
}

type bFsmEvent uint

const (
	// message type events
	peerErrorEv = iota + 1
	syncFinishedEv
)

type bFsmEventData struct {
	peerID p2p.ID
	err    error
}

// bcFsmMessage is used by the FSM to send messages to the reactor
type bcFsmMessage struct {
	event bFsmEvent
	data  bFsmEventData
}

// SetLogger implements service.Service by setting the logger on reactor and pool.
func (bcR *BlockchainReactor) SetLogger(l log.Logger) {
	bcR.BaseService.Logger = l
	bcR.fsm.SetLogger(l)
}

// OnStart implements service.Service.
func (bcR *BlockchainReactor) OnStart() error {
	bcR.swReporter = behaviour.NewSwitchReporter(bcR.BaseReactor.Switch)
	if bcR.fastSync {
		go bcR.poolRoutine()
	}
	return nil
}

// OnStop implements service.Service.
func (bcR *BlockchainReactor) OnStop() {
	_ = bcR.Stop()
}

// SwitchToFastSync is called by the state sync reactor when switching to fast sync.
func (bcR *BlockchainReactor) SwitchToFastSync(state sm.State) error {
	bcR.fastSync = true
	bcR.initialState = state
	bcR.state = state
	bcR.stateSynced = true

	bcR.fsm = NewFSM(state.LastBlockHeight+1, bcR)
	bcR.fsm.SetLogger(bcR.Logger)
	go bcR.poolRoutine()
	return nil
}

// GetChannels implements Reactor
func (bcR *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor {
	return []*p2p.ChannelDescriptor{
		{
			ID:                  BlockchainChannel,
			Priority:            10,
			SendQueueCapacity:   2000,
			RecvBufferCapacity:  50 * 4096,
			RecvMessageCapacity: bc.MaxMsgSize,
			MessageType:         &bcproto.Message{},
		},
	}
}

// AddPeer implements Reactor by sending our state to peer.
func (bcR *BlockchainReactor) AddPeer(peer p2p.Peer) {
	p2p.SendEnvelopeShim(peer, p2p.Envelope{ //nolint: staticcheck
		ChannelID: BlockchainChannel,
		Message: &bcproto.StatusResponse{
			Base:   bcR.store.Base(),
			Height: bcR.store.Height(),
		},
	}, bcR.Logger)
	// it's OK if send fails. will try later in poolRoutine

	// peer is added to the pool once we receive the first
	// bcStatusResponseMessage from the peer and call pool.updatePeer()
}

// sendBlockToPeer loads a block and sends it to the requesting peer.
// If the block doesn't exist a bcNoBlockResponseMessage is sent.
// If all nodes are honest, no node should be requesting for a block that doesn't exist.
func (bcR *BlockchainReactor) sendBlockToPeer(msg *bcproto.BlockRequest,
	src p2p.Peer) (queued bool) {

	block := bcR.store.LoadBlock(msg.Height)
	if block != nil {
		pbbi, err := block.ToProto()
		if err != nil {
			bcR.Logger.Error("Could not send block message to peer", "err", err)
			return false
		}
		return p2p.TrySendEnvelopeShim(src, p2p.Envelope{ //nolint: staticcheck
			ChannelID: BlockchainChannel,
			Message:   &bcproto.BlockResponse{Block: pbbi},
		}, bcR.Logger)
	}

	bcR.Logger.Info("peer asking for a block we don't have", "src", src, "height", msg.Height)

	return p2p.TrySendEnvelopeShim(src, p2p.Envelope{ //nolint: staticcheck
		ChannelID: BlockchainChannel,
		Message:   &bcproto.NoBlockResponse{Height: msg.Height},
	}, bcR.Logger)
}

func (bcR *BlockchainReactor) sendStatusResponseToPeer(msg *bcproto.StatusRequest, src p2p.Peer) (queued bool) {
	return p2p.TrySendEnvelopeShim(src, p2p.Envelope{ //nolint: staticcheck
		ChannelID: BlockchainChannel,
		Message: &bcproto.StatusResponse{
			Base:   bcR.store.Base(),
			Height: bcR.store.Height(),
		},
	}, bcR.Logger)
}

// RemovePeer implements Reactor by removing peer from the pool.
func (bcR *BlockchainReactor) RemovePeer(peer p2p.Peer, reason interface{}) {
	msgData := bcReactorMessage{
		event: peerRemoveEv,
		data: bReactorEventData{
			peerID: peer.ID(),
			err:    errSwitchRemovesPeer,
		},
	}
	bcR.errorsForFSMCh <- msgData
}

// Receive implements Reactor by handling 4 types of messages (look below).
func (bcR *BlockchainReactor) ReceiveEnvelope(e p2p.Envelope) {
	if err := bc.ValidateMsg(e.Message); err != nil {
		bcR.Logger.Error("peer sent us invalid msg", "peer", e.Src, "msg", e.Message, "err", err)
		_ = bcR.swReporter.Report(behaviour.BadMessage(e.Src.ID(), err.Error()))
		return
	}

	bcR.Logger.Debug("Receive", "src", e.Src, "chID", e.ChannelID, "msg", e.Message)

	switch msg := e.Message.(type) {
	case *bcproto.BlockRequest:
		if queued := bcR.sendBlockToPeer(msg, e.Src); !queued {
			// Unfortunately not queued since the queue is full.
			bcR.Logger.Error("Could not send block message to peer", "src", e.Src, "height", msg.Height)
		}

	case *bcproto.StatusRequest:
		// Send peer our state.
		if queued := bcR.sendStatusResponseToPeer(msg, e.Src); !queued {
			// Unfortunately not queued since the queue is full.
			bcR.Logger.Error("Could not send status message to peer", "src", e.Src)
		}

	case *bcproto.BlockResponse:
		bi, err := types.BlockFromProto(msg.Block)
		if err != nil {
			bcR.Logger.Error("error transition block from protobuf", "err", err)
			return
		}
		msgForFSM := bcReactorMessage{
			event: blockResponseEv,
			data: bReactorEventData{
				peerID: e.Src.ID(),
				height: bi.Height,
				block:  bi,
				length: msg.Size(),
			},
		}
		bcR.Logger.Info("Received", "src", e.Src, "height", bi.Height)
		bcR.messagesForFSMCh <- msgForFSM
	case *bcproto.NoBlockResponse:
		msgForFSM := bcReactorMessage{
			event: noBlockResponseEv,
			data: bReactorEventData{
				peerID: e.Src.ID(),
				height: msg.Height,
			},
		}
		bcR.Logger.Debug("Peer does not have requested block", "peer", e.Src, "height", msg.Height)
		bcR.messagesForFSMCh <- msgForFSM

	case *bcproto.StatusResponse:
		// Got a peer status. Unverified.
		msgForFSM := bcReactorMessage{
			event: statusResponseEv,
			data: bReactorEventData{
				peerID: e.Src.ID(),
				height: msg.Height,
				length: msg.Size(),
			},
		}
		bcR.messagesForFSMCh <- msgForFSM

	default:
		bcR.Logger.Error(fmt.Sprintf("unknown message type %T", msg))
	}
}

func (bcR *BlockchainReactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {
	msg := &bcproto.Message{}
	err := proto.Unmarshal(msgBytes, msg)
	if err != nil {
		panic(err)
	}
	uw, err := msg.Unwrap()
	if err != nil {
		panic(err)
	}
	bcR.ReceiveEnvelope(p2p.Envelope{
		ChannelID: chID,
		Src:       peer,
		Message:   uw,
	})
}

// processBlocksRoutine processes blocks until signlaed to stop over the stopProcessing channel
func (bcR *BlockchainReactor) processBlocksRoutine(stopProcessing chan struct{}) {

	processReceivedBlockTicker := time.NewTicker(trySyncIntervalMS * time.Millisecond)
	doProcessBlockCh := make(chan struct{}, 1)

	lastHundred := time.Now()
	lastRate := 0.0

ForLoop:
	for {
		select {
		case <-stopProcessing:
			bcR.Logger.Info("finishing block execution")
			break ForLoop
		case <-processReceivedBlockTicker.C: // try to execute blocks
			select {
			case doProcessBlockCh <- struct{}{}:
			default:
			}
		case <-doProcessBlockCh:
			for {
				err := bcR.processBlock()
				if err == errMissingBlock {
					break
				}
				// Notify FSM of block processing result.
				msgForFSM := bcReactorMessage{
					event: processedBlockEv,
					data: bReactorEventData{
						err: err,
					},
				}
				_ = bcR.fsm.Handle(&msgForFSM)

				if err != nil {
					break
				}

				bcR.blocksSynced++
				if bcR.blocksSynced%100 == 0 {
					lastRate = 0.9*lastRate + 0.1*(100/time.Since(lastHundred).Seconds())
					height, maxPeerHeight := bcR.fsm.Status()
					bcR.Logger.Info("Fast Sync Rate", "height", height,
						"max_peer_height", maxPeerHeight, "blocks/s", lastRate)
					lastHundred = time.Now()
				}
			}
		}
	}
}

// poolRoutine receives and handles messages from the Receive() routine and from the FSM.
func (bcR *BlockchainReactor) poolRoutine() {

	bcR.fsm.Start()

	sendBlockRequestTicker := time.NewTicker(trySendIntervalMS * time.Millisecond)
	statusUpdateTicker := time.NewTicker(statusUpdateIntervalSeconds * time.Second)

	stopProcessing := make(chan struct{}, 1)
	go bcR.processBlocksRoutine(stopProcessing)

ForLoop:
	for {
		select {

		case <-sendBlockRequestTicker.C:
			if !bcR.fsm.NeedsBlocks() {
				continue
			}
			_ = bcR.fsm.Handle(&bcReactorMessage{
				event: makeRequestsEv,
				data: bReactorEventData{
					maxNumRequests: maxNumRequests}})

		case <-statusUpdateTicker.C:
			// Ask for status updates.
			go bcR.sendStatusRequest()

		case msg := <-bcR.messagesForFSMCh:
			// Sent from the Receive() routine when status (statusResponseEv) and
			// block (blockResponseEv) response events are received
			_ = bcR.fsm.Handle(&msg)

		case msg := <-bcR.errorsForFSMCh:
			// Sent from the switch.RemovePeer() routine (RemovePeerEv) and
			// FSM state timer expiry routine (stateTimeoutEv).
			_ = bcR.fsm.Handle(&msg)

		case msg := <-bcR.eventsFromFSMCh:
			switch msg.event {
			case syncFinishedEv:
				stopProcessing <- struct{}{}
				// Sent from the FSM when it enters finished state.
				break ForLoop
			case peerErrorEv:
				// Sent from the FSM when it detects peer error
				bcR.reportPeerErrorToSwitch(msg.data.err, msg.data.peerID)
				if msg.data.err == errNoPeerResponse {
					// Sent from the peer timeout handler routine
					_ = bcR.fsm.Handle(&bcReactorMessage{
						event: peerRemoveEv,
						data: bReactorEventData{
							peerID: msg.data.peerID,
							err:    msg.data.err,
						},
					})
				}
				// else {
				// For slow peers, or errors due to blocks received from wrong peer
				// the FSM had already removed the peers
				// }
			default:
				bcR.Logger.Error("Event from FSM not supported", "type", msg.event)
			}

		case <-bcR.Quit():
			break ForLoop
		}
	}
}

func (bcR *BlockchainReactor) reportPeerErrorToSwitch(err error, peerID p2p.ID) {
	peer := bcR.Switch.Peers().Get(peerID)
	if peer != nil {
		_ = bcR.swReporter.Report(behaviour.BadMessage(peerID, err.Error()))
	}
}

func (bcR *BlockchainReactor) processBlock() error {

	first, second, err := bcR.fsm.FirstTwoBlocks()
	if err != nil {
		// We need both to sync the first block.
		return err
	}

	chainID := bcR.initialState.ChainID

	firstParts := first.MakePartSet(types.BlockPartSizeBytes)
	firstPartSetHeader := firstParts.Header()
	firstID := types.BlockID{Hash: first.Hash(), PartSetHeader: firstPartSetHeader}
	// Finally, verify the first block using the second's commit
	// NOTE: we can probably make this more efficient, but note that calling
	// first.Hash() doesn't verify the tx contents, so MakePartSet() is
	// currently necessary.
	err = bcR.state.Validators.VerifyCommitLight(chainID, firstID, first.Height, second.LastCommit)
	if err != nil {
		bcR.Logger.Error("error during commit verification", "err", err,
			"first", first.Height, "second", second.Height)
		return errBlockVerificationFailure
	}

	bcR.store.SaveBlock(first, firstParts, second.LastCommit)

	bcR.state, _, err = bcR.blockExec.ApplyBlock(bcR.state, firstID, first)
	if err != nil {
		panic(fmt.Sprintf("failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err))
	}

	return nil
}

// Implements bcRNotifier
// sendStatusRequest broadcasts `BlockStore` height.
func (bcR *BlockchainReactor) sendStatusRequest() {
	bcR.Switch.BroadcastEnvelope(p2p.Envelope{
		ChannelID: BlockchainChannel,
		Message:   &bcproto.StatusRequest{},
	})
}

// Implements bcRNotifier
// BlockRequest sends `BlockRequest` height.
func (bcR *BlockchainReactor) sendBlockRequest(peerID p2p.ID, height int64) error {
	peer := bcR.Switch.Peers().Get(peerID)
	if peer == nil {
		return errNilPeerForBlockRequest
	}

	queued := p2p.TrySendEnvelopeShim(peer, p2p.Envelope{ //nolint: staticcheck
		ChannelID: BlockchainChannel,
		Message:   &bcproto.BlockRequest{Height: height},
	}, bcR.Logger)
	if !queued {
		return errSendQueueFull
	}
	return nil
}

// Implements bcRNotifier
func (bcR *BlockchainReactor) switchToConsensus() {
	conR, ok := bcR.Switch.Reactor("CONSENSUS").(consensusReactor)
	if ok {
		conR.SwitchToConsensus(bcR.state, bcR.blocksSynced > 0 || bcR.stateSynced)
		bcR.eventsFromFSMCh <- bcFsmMessage{event: syncFinishedEv}
	}
	// else {
	// Should only happen during testing.
	// }
}

// Implements bcRNotifier
// Called by FSM and pool:
// - pool calls when it detects slow peer or when peer times out
// - FSM calls when:
//   - adding a block (addBlock) fails
//   - reactor processing of a block reports failure and FSM sends back the peers of first and second blocks
func (bcR *BlockchainReactor) sendPeerError(err error, peerID p2p.ID) {
	bcR.Logger.Info("sendPeerError:", "peer", peerID, "error", err)
	msgData := bcFsmMessage{
		event: peerErrorEv,
		data: bFsmEventData{
			peerID: peerID,
			err:    err,
		},
	}
	bcR.eventsFromFSMCh <- msgData
}

// Implements bcRNotifier
func (bcR *BlockchainReactor) resetStateTimer(name string, timer **time.Timer, timeout time.Duration) {
	if timer == nil {
		panic("nil timer pointer parameter")
	}
	if *timer == nil {
		*timer = time.AfterFunc(timeout, func() {
			msg := bcReactorMessage{
				event: stateTimeoutEv,
				data: bReactorEventData{
					stateName: name,
				},
			}
			bcR.errorsForFSMCh <- msg
		})
	} else {
		(*timer).Reset(timeout)
	}
}
